Cloud workflowのワークフロー中で並列実行をしてみる
はじめに
データアナリティクス事業本部のkobayashiです。
GoogleCloudのWorkflowsで並列でStepを実行できるParallel Stepsを使ってみたのでまとめます。
Workflows release notes | Google Cloud
Parallel Stepsとは
ワークフロー中で一つの処理をStepとして実行しますが、Parallel Stepsはその名の通りStepを同時実行するステップになります。 並列ステップを使う場面としては複数の独立したAPIへリクエストを送りそのレスポンスを取得したいがレスポンスまでの時間が長くかかかる場合において、逐次処理に比べ並列ステップを使い複数のAPIへリクエストを同時に行うことで全体の処理時間を圧縮できるため重宝します。
Parallel steps | Workflows | Google Cloud
記述の仕方としては以下のようになります。
- {ステップ名}: parallel: # exception_policy: continueAll shared: [ ret_val ] # concurrency_limit: 10 for: value: test in: ${tests}
- {ステップ名}: parallel: # exception_policy: continueAll shared: [ ret_val ] # concurrency_limit: 10 branches: - branch_1: ... - branch_2: ...
並列ループと並列ブランチを使うパターンで処理を行うステップの記述が異なりますがそれ以外の属性は同じです。
各属性はexception_policy
は今日(2023/04/20)現在continueAll
しか設定がないので設定する必要はありません。またshared
は並列ステップ内で書き込み可能な変数を指定します。concurrency_limit
は同時に実行できる並列ステップ数になります。
実際の処理を行うステップは列ループの場合はfor
の配下に共通のステップを記述します。並列ブランチの場合はbranches
配下に子ステップを続けて記述していきます。使い分けですが、並列ループはある程度共通化した処理を行う場合に使います。具体的にはAPIに対してリクエストを行うがそのパラメータが違う場合などに使用できるかと思います。一方、並列ブランチは子ステップの処理が異なる場合に使います。
Parallel Stepsを使ってみる
では実際にParallel Stepsを使ってみたいと思います。今回はAPIへリクエストを送るのではなく、単純にsys.log
でログを書き出してみます。
並列ループ
はじめに並列ループを使ってみたいと思います。コードは以下になります。
main: steps: - assign_value: assign: - str_val: "str_val" - int_val: 999 - tests: [ "logging_1","logging_2","logging_3","logging_4","logging_5","logging_6","logging_7","logging_8","logging_9" ] - ret_val: [ ] - loggings: parallel: shared: [ ret_val ] for: value: test in: ${tests} steps: - logging_1: call: sys.log args: text: ${test + " " + "str_val:" + str_val + ", int_val:" + int_val } severity: INFO - append: assign: - ret_val: ${list.concat(ret_val,test)} - logging_end: call: sys.log args: text: ${ret_val} severity: INFO
行っていることは、loggings
を並列ステップとしてその配下で並列ループでtests
の中身をループ処理します。子ステップではステップ名の書き出しとステップ名を並列ステップのshared変数に追記します。すべての子ステップが完了した後にlogging_end
ステップでshared変数の中身をログに書き出します。
上記のコードをデプロイ&実行してみます。
$ gcloud workflows deploy test-parallel_for --source=parallel_for.yml --service-account ${SERVICE_ACCOUNT} --location ${DEFAULT_REGION} $ gcloud workflows run test-parallel_for --location asia-northeast1
Workflowsのコンソールでソース
からデプロイしたWorkflowを可視化したものを確認してみます。
実行ログを確認すると意図した通りtests
の中身をループ処理して子ステップ内でステップ名の書き出しとステップ名をshared変数に追記し、すべての子ステップが完了した後にlogging_end
ステップでshared変数の中身をログに書き出しています。
注意点として並列ステップなので必ずしもtests
に記述した順序で実行されることは無いという点です。実行順序を気にする場合は並列ステップではなく逐次にステップを記述する必要があります。
2023-04-20 14:36:07.551 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:36:07Z, start: {…}, state: ACTIVE} 2023-04-20 14:36:07.733 JST logging_7 str_val:str_val, int_val:999 2023-04-20 14:36:07.767 JST logging_1 str_val:str_val, int_val:999 2023-04-20 14:36:07.800 JST logging_2 str_val:str_val, int_val:999 2023-04-20 14:36:07.861 JST logging_3 str_val:str_val, int_val:999 2023-04-20 14:36:07.887 JST logging_4 str_val:str_val, int_val:999 2023-04-20 14:36:07.979 JST logging_9 str_val:str_val, int_val:999 2023-04-20 14:36:08.008 JST logging_5 str_val:str_val, int_val:999 2023-04-20 14:36:08.391 JST ["logging_8","logging_2","logging_1","logging_7","logging_9","logging_5","logging_6","logging_4","logging_3"] 2023-04-20 14:36:08.560 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:36:08Z, state: SUCCEEDED, success: {…}}
並列ブランチ
次に並列ブランチを使ってみたいと思います。コードは以下になります。
main: steps: - assign_value: assign: - str_val: "str_val" - int_val: 999 - ret_val: [] - loggings: parallel: shared: [ ret_val ] branches: - logging_1_branch: steps: - logging_1: call: sys.log args: text: ${"logging_1" + " " + "str_val:" + str_val + ", int_val:" + int_val } severity: INFO - logging_1_sleep: call: sys.sleep args: seconds: 10 - logging_1_2: call: sys.log args: text: ${"logging_1_2" + " " + "str_val:" + str_val + ", int_val:" + int_val } severity: INFO - logging_1_append: assign: - ret_val: ${list.concat(ret_val,"logging_1")} - logging_2_branch: steps: - logging_2: call: sys.log args: text: ${"logging_2" + " " + "str_val:" + str_val + ", int_val:" + int_val } severity: INFO - logging_2_append: assign: - ret_val: ${list.concat(ret_val,"logging_2")} - logging_3_branch: steps: - logging_3: call: sys.log args: text: ${"logging_3" + " " + "str_val:" + str_val + ", int_val:" + int_val } severity: INFO - logging_3_append: assign: - ret_val: ${list.concat(ret_val,"logging_3")} - logging_4_branch: steps: - logging_4: call: sys.log args: text: ${"logging_4" + " " + "str_val:" + str_val + ", int_val:" + int_val } severity: INFO - logging_4_append: assign: - ret_val: ${list.concat(ret_val,"logging_4")} - logging_end: call: sys.log args: text: ${ret_val} severity: INFO
行っていることは、loggings
を並列ステップとしてその配下で4つのステップを並列実行します。子ステップではステップ名の書き出しとステップ名を並列ステップのshared変数に追記します。すべての子ステップが完了した後にlogging_end
ステップでshared変数の中身をログに書き出します。
またlogging_1_branch
では途中でSleep処理を入れて10秒待機することで他のステップと処理を変えています。
上記のコードをデプロイ&実行してみます。
$ gcloud workflows deploy test-parallel_branch --source=parallel_branch.yml --service-account ${SERVICE_ACCOUNT} --location ${DEFAULT_REGION} $ gcloud workflows run test-parallel_branch --location asia-northeast1
Workflowsのコンソールでソース
からデプロイしたWorkflowを可視化したものを確認してみます。
logging_1_branch
ステップのみ内容の異なった並列処理になっていることがわかります。
実行ログを確認すると意図した通りlogging_1_branch
ステップでは最初のログを書き出した後に10秒待機してから次のログを書き出し、すべての子ステップの完了を待ってからshared変数の中身をログに書き出されていることがわかります。
こちらも並列ループと同様に必ずしも子ステップが記述順に実行されるわけではないということがわかります。
2023-04-20 14:53:25.844 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:53:25Z, start: {…}, state: ACTIVE} 2023-04-20 14:53:26.018 JST logging_4 str_val:str_val, int_val:999 2023-04-20 14:53:26.062 JST logging_2 str_val:str_val, int_val:999 2023-04-20 14:53:26.073 JST logging_1 str_val:str_val, int_val:999 2023-04-20 14:53:26.120 JST logging_3 str_val:str_val, int_val:999 2023-04-20 14:53:36.493 JST logging_1_2 str_val:str_val, int_val:999 2023-04-20 14:53:36.792 JST ["logging_4","logging_2","logging_3","logging_1"] 2023-04-20 14:53:36.966 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:53:36Z, state: SUCCEEDED, success: {…}}
まとめ
GoogleCloudのWorkflowsで並列でStepを実行できるParallel Stepsを使ってみました。並列処理が行える処理はParallel Stepsを使うことで処理時間を短縮できるので使わない手はないと思います。ただし実行数、深度、メモリ等の上限があります(割り当てと上限 | ワークフロー | Google Cloud )のでその点は注意して使いましょう。
最後まで読んで頂いてありがとうございました。